\begin{aosachapter}{Riak and Erlang/OTP}{s:riak}{Francesco Cesarini, Andy Gross, and Justin Sheehy}

Riak is a distributed, fault tolerant, open source database that
illustrates how to build large scale systems using Erlang/OTP\@.  Thanks
in large part to Erlang's support for massively scalable distributed
systems, Riak offers features that are uncommon in databases, such as
high-availability and linear scalability of both capacity and
throughput.

Erlang/OTP provides an ideal platform for developing systems like Riak
because it provides inter-node communication, message queues, failure
detectors, and client-server abstractions out of the box.  What's
more, most frequently-used patterns in Erlang have been implemented
in library modules, commonly referred to as OTP behaviors. They
contain the generic code framework for concurrency and error handling,
simplifying concurrent programming and protecting
the developer from many common pitfalls. Behaviors are monitored by
supervisors, themselves a behavior, and grouped together in
supervision trees. A supervision tree is packaged in an application,
creating a building block of an Erlang program.

A complete Erlang system such as Riak is a set of loosely coupled
applications that interact with each other. Some of these applications
have been written by the developer, some are part of the standard
Erlang/OTP distribution, and some may be other open source
components. They are sequentially loaded and started by a boot script
generated from a list of applications and versions.

What \emph{differs} among systems are the applications that are part
of the release which is started. In the standard Erlang distribution,
the boot files will start the \emph{Kernel} and \emph{StdLib}
(Standard Library) applications. In some installations, the
\emph{SASL} (Systems Architecture Support Library) application is also
started. SASL contains release and software upgrade tools together
with logging capabilities. Riak is no different, other than starting
the Riak specific applications as well as their runtime dependencies,
which include \emph{Kernel}, \emph{StdLib} and \emph{SASL}. A complete and
ready-to-run build of Riak actually embeds these standard elements of
the Erlang/OTP distribution and starts them all in unison when
\code{riak start} is invoked on the command line. Riak consists of
many complex applications, so this chapter should not be
interpreted as a complete guide. It should be seen as an introduction
to OTP where examples from the Riak source code are used. The figures
and examples have been abbreviated and shortened for demonstration
purposes.

\begin{aosasect1}{An Abridged Introduction to Erlang}

Erlang is a concurrent functional programming language that compiles
to byte code and runs in a virtual machine. Programs consist of
functions that call each other, often resulting in side effects such
as inter-process message passing, I/O and database operations. Erlang
variables are single assignment, i.e., once they have been given
values, they cannot be updated.  The language makes extensive use of
pattern matching, as shown in the factorial example below:

\begin{verbatim}
-module(factorial).
-export([fac/1]).
fac(0) -> 1;
fac(N) when N>0 ->
   Prev = fac(N-1),
   N*Prev.
\end{verbatim}

\noindent Here, the first clause gives the factorial of zero, the second
factorials of positive numbers. The body of each clause is a sequence
of expressions, and the final expression in the body is the result of
that clause. Calling the function with a negative number will result
in a run time error, as none of the clauses match. Not handling this
case is an example of non-defensive programming, a practice encouraged
in Erlang.

Within the module, functions are called in the usual way; outside, the
name of the module is prepended, as in \code{factorial:fac(3)}. It
is possible to define functions with the same name but different
numbers of arguments---this is called their \emph{arity}. In the export
directive in the \code{factorial} module the \code{fac} function of
arity one is denoted by \code{fac/1}.

Erlang supports tuples (also called product types) and lists. Tuples
are enclosed in curly brackets, as in \code{\{ok,37\}}. In tuples, we
access elements by position. Records are another data type; they allow
us to store a fixed number of elements which are then accessed and
manipulated by name. We define a record using the \code{-record(state,
\{id, msg\_list=[]\}).}  To create an instance, we use the
expression \code{Var = \#state\{id=1\}}, and we examine its contents
using \code{Var\#state.id}.  For a variable number of elements, we use
lists defined in square brackets such as in \code{{[}23,34{]}}. The
notation \code{{[}X|Xs{]}} matches a non-empty list with head \code{X}
and tail \code{Xs}. Identifiers beginning with a lower case letter
denote atoms, which simply stand for themselves; the \code{ok} in the
tuple \code{\{ok,37\}} is an example of an atom. Atoms used in this
way are often used to distinguish between different kinds of function
result: as well as \code{ok} results, there might be results of the
form \code{\{error, "Error String"\}}.

Processes in Erlang systems run concurrently in separate memory, and
communicate with each other by message passing. Processes can be used
for a wealth of applications, including gateways to databases, as
handlers for protocol stacks, and to manage the logging of trace
messages from other processes. Although these processes handle
different requests, there will be similarities in how these requests
are handled.

As processes exist only within the virtual machine, a single VM can
simultaneously run millions of processes, a feature Riak exploits
extensively. For example, each request to the database---reads, writes, and
deletes---is modeled as a separate process, an approach that would
not be possible with most OS-level threading implementations.

Processes are identified by process identifiers, called PIDs, but
they can also be registered under an alias; this should only be used
for long-lived ``static'' processes. Registering a process with its
alias allows other processes to send it messages without knowing its
PID. Processes are created using the \code{spawn(Module, Function,
 Arguments)} built-in function (BIF). BIFs are functions integrated
in the VM and used to do what is impossible or slow to execute in pure
Erlang. The \code{spawn/3} BIF takes a \code{Module}, a
\code{Function} and a list of \code{Arguments} as parameters. The call
returns the PID of the newly spawned process and as a side effect,
creates a new process that starts executing the function in the module
with the arguments mentioned earlier.

A message \code{Msg} is sent to a process with process id \code{Pid}
using \code{Pid ! Msg}. A process can find out its PID by calling the
BIF \code{self}, and this can then be sent to other processes for them
to use to communicate with the original process. Suppose that a
process expects to receive messages of the form \code{\{ok, N\}} and
\code{\{error, Reason\}}. To process these it uses a receive
statement:

\begin{verbatim}
receive
   {ok, N} ->
      N+1;
   {error, _} ->
      0
end
\end{verbatim}

\noindent The result of this is a number determined by the pattern-matched
clause. When the value of a variable is not needed in the pattern
match, the underscore wild-card can be used as shown above.

Message passing between processes is asynchronous, and the messages
received by a process are placed in the process's mailbox in the order
in which they arrive. Suppose that now the \code{receive} expression
above is to be executed: if the first element in the mailbox is either
\code{\{ok, N\}} or \code{\{error, Reason\}} the corresponding result
will be returned. If the first message in the mailbox is not of this
form, it is retained in the mailbox and the second is processed in a
similar way. If no message matches, the receive will wait for a
matching message to be received.

Processes terminate for two reasons. If there is no more code to
execute, they are said to terminate with reason \emph{normal}. If a
process encounters a run-time error, it is said to terminate with a
\emph{non-normal} reason. A process terminating will not affect other
processes unless they are linked to it. Processes can link to each
other through the \code{link(Pid)} BIF or when calling the
\code{spawn\_link(Module, Function, Arguments)}. If a process
terminates, it sends an EXIT signal to processes in its link set. If
the termination reason is non-normal, the process terminates itself,
propagating the EXIT signal further. By calling the
\code{process\_flag(trap\_exit, true)} BIF, processes can receive the
EXIT signals as Erlang messages in their mailbox instead of
terminating.

Riak uses EXIT signals to monitor the well-being of helper processes
performing non-critical work initiated by the request-driving finite
state machines. When these helper processes terminate abnormally, the
EXIT signal allows the parent to either ignore the error or restart
the process.

\end{aosasect1}

\begin{aosasect1}{Process Skeletons}

We previously introduced the notion that processes follow a common
pattern regardless of the particular purpose for which the process was
created. To start off, a process has to be spawned and then,
optionally, have its alias registered. The first action of the newly
spawned process is to initialize the process loop data. The loop data
is often the result of arguments passed to the \code{spawn} built-in
function at the initialization of the process. Its loop data is stored
in a variable we refer to as the process state. The state, often
stored in a record, is passed to a receive-evaluate function, running
a loop which receives a message, handles it, updates the state, and
passes it back as an argument to a tail-recursive call. If one of the
messages it handles is a `stop' message, the receiving process will
clean up after itself and then terminate.

This is a recurring theme among processes that will occur regardless
of the task the process has been assigned to perform. With this in
mind, let's look at the differences between the processes that conform
to this pattern:

\begin{aosaitemize}

  \item The arguments passed to the \code{spawn} BIF calls will differ
  from one process to another.

  \item You have to decide whether you should register a process under
  an alias, and if you do, what alias should be used.

  \item In the function that initializes the process state, the actions
  taken will differ based on the tasks the process will perform.

  \item The state of the system is represented by the loop data in every
  case, but the contents of the loop data will vary among processes.

  \item When in the body of the receive-evaluate loop, processes will
  receive different messages and handle them in different ways.

  \item Finally, on termination, the cleanup will vary from process to
  process.

\end{aosaitemize}

So, even if a skeleton of generic actions exists, these actions are
complemented by specific ones that are directly related to the tasks
assigned to the process. Using this skeleton as a template,
programmers can create Erlang processes that act as servers, finite
state machines, event handlers and supervisors. But instead of
re-implementing these patterns every time, they have been placed in
library modules referred to as behaviors. They come as part as the OTP
middleware.

\end{aosasect1}

\begin{aosasect1}{OTP Behaviors}

The core team of developers committing to Riak is spread across nearly
a dozen geographical locations.  Without very tight coordination and
templates to work from, the result would consist of different
client/server implementations not handling special borderline cases
and concurrency-related errors. There would probably be no uniform way
to handle client and server crashes or guaranteeing that a response
from a request is indeed the response, and not just any message that
conforms to the internal message protocol.

OTP is a set of Erlang libraries and design principles providing
ready-made tools with which to develop robust systems. Many of these
patterns and libraries are provided in the form of ``behaviors.''

OTP behaviors address these issues by providing library modules that
implement the most common concurrent design patterns. Behind the
scenes, without the programmer having to be aware of it, the library
modules ensure that errors and special cases are handled in a
consistent way. As a result, OTP behaviors provide a set of
standardized building blocks used in designing and building
industrial-grade systems.

\begin{aosasect2}{Introduction}

OTP behaviors are provided as library modules in the \code{stdlib}
application which comes as part of the Erlang/OTP distribution. The
specific code, written by the programmer, is placed in a separate
module and called through a set of predefined callback functions
standardized for each behavior. This callback module will contain all
of the specific code required to deliver the desired functionality.

OTP behaviors include worker processes, which do the actual
processing, and supervisors, whose task is to monitor workers and
other supervisors. Worker behaviors, often denoted in diagrams as
circles, include servers, event handlers, and finite state
machines. Supervisors, denoted in illustrations as squares, monitor
their children, both workers and other supervisors, creating what is
called a supervision tree.

\aosafigure{../images/riak/supervision-tree.png}{OTP Riak Supervision Tree}{fig.erlang.supervision}

Supervision trees are packaged into a behavior called an
application. OTP applications are not only the building blocks of
Erlang systems, but are also a way to package reusable
components. Industrial-grade systems like Riak consist of a set of
loosely coupled, possibly distributed applications. Some of these
applications are part of the standard Erlang distribution and some are
the pieces that make up the specific functionality of Riak.

Examples of OTP applications include the Corba ORB or the Simple
Network Management Protocol (SNMP) agent. An OTP application is a
reusable component that packages library modules together with
supervisor and worker processes. From now on, when we refer to an
application, we will mean an OTP application.

The behavior modules contain all of the generic code for each given
behavior type. Although it is possible to implement your own behavior
module, doing so is rare because the ones that come with the
Erlang/OTP distribution will cater to most of the design patterns you
would use in your code. The generic functionality provided in a
behavior module includes operations such as:

\begin{aosaitemize}

  \item spawning and possibly registering the process;

  \item sending and receiving client messages as synchronous or
  asynchronous calls, including defining the internal message protocol;

  \item storing the loop data and managing the process loop; and

  \item stopping the process.

\end{aosaitemize}

The loop data is a variable that will contain the data the behavior
needs to store in between calls. After the call, an updated variant of
the loop data is returned. This updated loop data, often referred to
as the new loop data, is passed as an argument in the next call. Loop
data is also often referred to as the behavior state.

The functionality to be included in the callback module for the
generic server application to deliver the specific required behavior
includes the following:

\begin{aosaitemize}

  \item Initializing the process loop data, and, if the process is
  registered, the process name.

  \item Handling the specific client requests, and, if synchronous, the
  replies sent back to the client.

  \item Handling and updating the process loop data in between the
  process requests.

  \item Cleaning up the process loop data upon termination.

\end{aosaitemize}

\end{aosasect2}

\begin{aosasect2}{Generic Servers}

Generic servers that implement client/server behaviors are defined in
the \code{gen\_server} behavior that comes as part of the standard
library application. In explaining generic servers, we will use the
\code{riak\_core\_node\_watcher.erl} module from the \code{riak\_core}
application. It is a server that tracks and reports on which
sub-services and nodes in a Riak cluster are available. The module
headers and directives are as follows:

\begin{verbatim}
-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
         services/1,nodes/1,avsn/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).

-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
                bcast_mod={gen_server, abcast}}).
\end{verbatim}

We can easily recognize generic servers through the
\code{-behavior(gen\_server).} directive. This directive is used by
the compiler to ensure all callback functions are properly
exported. The record state is used in the server loop data.

\end{aosasect2}

\begin{aosasect2}{Starting Your Server}

With the \code{gen\_server} behavior, instead of using the
\code{spawn} and \code{spawn\_link} BIFs, you will use the
\code{gen\_server:start} and \code{gen\_server:start\_link}
functions. The main difference between \code{spawn} and \code{start}
is the synchronous nature of the call. Using \code{start} instead of
\code{spawn} makes starting the worker process more deterministic and
prevents unforeseen race conditions, as the call will not return the
PID of the worker until it has been initialized. You call the
functions with either of:

\begin{verbatim}
gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)
\end{verbatim}

\noindent \code{ServerName} is a tuple of the format \code{\{local, Name\}} or
\code{\{global, Name\}}, denoting a local or global \code{Name} for the
process alias if it is to be registered. Global names allow servers
to be transparently accessed across a cluster of distributed Erlang
nodes. If you do not want to register the process and instead
reference it using its PID, you omit the argument and use a
\code{start\_link/3} or \code{start/3} function call
instead. \code{CallbackModule} is the name of the module in which
the specific callback functions are placed, \code{Arguments} is a
valid Erlang term that is passed to the \code{init/1} callback
function, while \code{Options} is a list that allows you to set the
memory management flags \code{fullsweep\_after} and \code{heapsize},
as well as other tracing and debugging flags.

In our example, we call \code{start\_link/4}, registering the process
with the same name as the callback module, using the \code{?MODULE}
macro call. This macro is expanded to the name of the module it is
defined in by the preprocessor when compiling the code. It is always
good practice to name your behavior with an alias that is the same as
the callback module it is implemented in. We don't pass any arguments,
and as a result, just send the empty list. The options list is kept
empty:

\begin{verbatim}
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
\end{verbatim}

\noindent The obvious difference between the \code{start\_link} and \code{start}
functions is that \code{start\_link} links to its parent, most often a
supervisor, while \code{start} doesn't. This needs a special mention
as it is an OTP behavior's responsibility to link itself to the
supervisor. The \code{start} functions are often used when testing
behaviors from the shell, as a typing error causing the shell process
to crash would not affect the behavior. All variants of the
\code{start} and \code{start\_link} functions return \code{\{ok, Pid\}}.

The \code{start} and \code{start\_link} functions will spawn a new
process that calls the \code{init(Arguments)} callback function in the
\code{CallbackModule}, with the \code{Arguments} supplied. The
\code{init} function must initialize the \code{LoopData} of the server
and has to return a tuple of the format \code{\{ok,
LoopData\}}. \code{LoopData} contains the first instance of the loop
data that will be passed between the callback functions. If you want
to store some of the arguments you passed to the \code{init} function, you
would do so in the \code{LoopData} variable. The \code{LoopData} in
the Riak node watcher server is the result of the
\code{schedule\_broadcast/1} called with a record of type \code{state}
where the fields are set to the default values:

\begin{verbatim}
init([]) ->

    %% Watch for node up/down events
    net_kernel:monitor_nodes(true),

    %% Setup ETS table to track node status
    ets:new(?MODULE, [protected, named_table]),

    {ok, schedule_broadcast(#state{})}.
\end{verbatim}

Although the supervisor process might call the \code{start\_link/4}
function, a different process calls the \code{init/1} callback: the
one that was just spawned.  As the purpose of this server is to
notice, record, and broadcast the availability of sub-services within
Riak, the initialization asks the Erlang runtime to notify it of such
events, and sets up a table to store this information in.  This needs
to be done during initialization, as any calls to the server would
fail if that structure did not yet exist. Do only what is necessary
and minimize the operations in your \code{init} function, as the call
to \code{init} is a synchronous call that prevents all of the other
serialized processes from starting until it returns.

\end{aosasect2}

\begin{aosasect2}{Passing Messages}

If you want to send a synchronous message to your server, you use the
\code{gen\_server:call/2} function. Asynchronous calls are made using
the \code{gen\_server:cast/2} function. Let's start by taking two
functions from Riak's service API; we will provide the rest of the
code later. They are called by the client process and result in a
synchronous message being sent to the server process registered with
the same name as the callback module. Note that validating the data
sent to the server should occur on the client side. If the client
sends incorrect information, the server should terminate.

\begin{verbatim}
service_up(Id, Pid) ->
    gen_server:call(?MODULE, {service_up, Id, Pid}).

service_down(Id) ->
    gen_server:call(?MODULE, {service_down, Id}).
\end{verbatim}

\noindent Upon receiving the messages, the \code{gen\_server} process calls the
\code{handle\_call/3} callback function dealing with the messages in
the same order in which they were sent:

\begin{verbatim}
handle_call({service_up, Id, Pid}, _From, State) ->
    %% Update the set of active services locally
    Services = ordsets:add_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Setup a monitor for the Pid representing this service
    Mref = erlang:monitor(process, Pid),
    erlang:put(Mref, Id),
    erlang:put(Id, Mref),

    %% Update our local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

handle_call({service_down, Id}, _From, State) ->
    %% Update the set of active services locally
    Services = ordsets:del_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Update local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};
\end{verbatim}

\noindent Note the return value of the callback function. The tuple contains the
control atom \code{reply}, telling the \code{gen\_server} generic code
that the second element of the tuple (which in both of these cases is
the atom \code{ok}) is the reply sent back to the client. The third
element of the tuple is the new \code{State}, which, in a new
iteration of the server, is passed as the third argument to the
\code{handle\_call/3} function; in both cases here it is updated to
reflect the new set of available services. The argument \code{\_From}
is a tuple containing a unique message reference and the client
process identifier. The tuple as a whole is used in library functions
that we will not be discussing in this chapter. In the majority of
cases, you will not need it.

The \code{gen\_server} library module has a number of mechanisms and
safeguards built in that operate behind the scenes. If your client
sends a synchronous message to your server and you do not get a
response within five seconds, the process executing the \code{call/2}
function is terminated. You can override this by using
\code{gen\_server:call(Name, Message, Timeout)} where \code{Timeout}
is a value in milliseconds or the atom \code{infinity}.

The timeout mechanism was originally put in place for deadlock
prevention purposes, ensuring that servers that accidentally call each
other are terminated after the default timeout. The crash report would
be logged, and hopefully would result in the error being debugged and
fixed. Most applications will function appropriately with a timeout of
five seconds, but under very heavy loads, you might have to fine-tune
the value and possibly even use \code{infinity}; this choice is
application-dependent. All of the critical code in Erlang/OTP uses
\code{infinity}.  Various places in Riak use different values for the
timeout: \code{infinity} is common between coupled pieces of the
internals, while \code{Timeout} is set based on a user-passed
parameter in cases where the client code talking to Riak has specified
that an operation should be allowed to time out.

Other safeguards when using the \code{gen\_server:call/2} function
include the case of sending a message to a nonexistent server and
the case of a server crashing before sending its reply. In
both cases, the calling process will terminate. In raw Erlang, sending
a message that is never pattern-matched in a receive clause is a bug
that can cause a memory leak. Two different strategies are used in
Riak to mitigate this, both of which involve ``catchall'' matching
clauses.  In places where the message might be user-initiated, an
unmatched message might be silently discarded.  In places where such a
message could only come from Riak's internals, it represents a bug and
so will be used to trigger an error-alerting internal crash report,
restarting the worker process that received it.

Sending asynchronous messages works in a similar way. Messages are
sent asynchronously to the generic server and handled in the
\code{handle\_cast/2} callback function. The function has to return a
tuple of the format \code{\{reply, NewState\}}. Asynchronous calls are
used when we are not interested in the request of the server and are
not worried about producing more messages than the server can
consume. In cases where we are not interested in a response but want
to wait until the message has been handled before sending the next
request, we would use a \code{gen\_server:call/2}, returning the atom
\code{ok} in the reply. Picture a process generating database entries
at a faster rate than Riak can consume. By using asynchronous calls,
we risk filling up the process mailbox and make the node run out of
memory.  Riak uses the message-serializing properties of synchronous
\code{gen\_server} calls to regulate load, processing the next request
only when the previous one has been handled.  This approach eliminates
the need for more complex throttling code: in addition to enabling
concurrency, \code{gen\_server} processes can also be used to
introduce serialization points.

\end{aosasect2}

\begin{aosasect2}{Stopping the Server}

How do you stop the server? In your \code{handle\_call/3} and
\code{handle\_cast/2} callback functions, instead of returning
\code{\{reply, Reply, NewState\}} or \code{\{noreply, NewState\}}, you
can return \code{\{stop, Reason, Reply, NewState\}} or \code{\{stop,
Reason, NewState\}}, respectively. Something has to trigger this
return value, often a stop message sent to the server. Upon
receiving the stop tuple containing the \code{Reason} and
\code{State}, the generic code executes the \code{terminate(Reason,
State)} callback.

The \code{terminate} function is the natural place to insert the code
needed to clean up the \code{State} of the server and any other
persistent data used by the system. In our example, we send out one
last message to our peers so that they know that this node watcher is
no longer up and watching. In this example, the variable \code{State}
contains a record with the fields \code{status} and \code{peers}:

\begin{verbatim}
terminate(_Reason, State) ->
    %% Let our peers know that we are shutting down
    broadcast(State#state.peers, State#state { status = down }).
\end{verbatim}

Use of the behavior callbacks as library functions and invoking them
from other parts of your program is an extremely bad practice. For
example, you should never call
\path{riak_core_node_watcher:init(Args)} from another module to
retrieve the initial loop data. Such retrievals should be done through
a synchronous call to the server. Calls to behavior callback functions
should originate only from the behavior library modules as a result of
an event occurring in the system, and never directly by the user.

\end{aosasect2}

\end{aosasect1}

\begin{aosasect1}{Other Worker Behaviors}

A large number of other worker behaviors can and have been implemented
using these same ideas.

\begin{aosasect2}{Finite State Machines}

Finite state machines (FSMs), implemented in the \code{gen\_fsm} behavior
module, are a crucial component when implementing protocol stacks in
telecom systems (the problem domain Erlang was originally invented
for). States are defined as callback functions named after the state
that return a tuple containing the next \code{State} and the updated
loop data. You can send events to these states synchronously and
asynchronously. The finite state machine callback module should also
export the standard callback functions such as \code{init},
\code{terminate}, and \code{handle\_info}.

Of course, finite state machines are not telecom specific. In Riak,
they are used in the request handlers. When a client issues a request
such as \code{get}, \code{put}, or \code{delete}, the process
listening to that request will spawn a process implementing the
corresponding \code{gen\_fsm} behavior. For instance, the
\code{riak\_kv\_get\_fsm} is responsible for handling a \code{get}
request, retrieving data and sending it out to the client process. The
FSM process will pass through various states as it determines which
nodes to ask for the data, as it sends out messages to those nodes, and as
it receives data, errors, or timeouts in response.

\end{aosasect2}

\begin{aosasect2}{Event Handlers}

Event handlers and managers are another behavior implemented in the
\code{gen\_event} library module. The idea is to create a centralized
point that receives events of a specific kind. Events can be sent
synchronously and asynchronously with a predefined set of actions
being applied when they are received. Possible responses to events
include logging them to file, sending off an alarm in the form of an
SMS, or collecting statistics. Each of these actions is defined in a
separate callback module with its own loop data, preserved between
calls. Handlers can be added, removed, or updated for every specific
event manager. So, in practice, for every event manager there could
be many callback modules, and different instances of these callback
modules could exist in different managers. Event handlers include
processes receiving alarms, live trace data, equipment related events
or simple logs.

One of the uses for the \code{gen\_event} behavior in Riak is for
managing subscriptions to ``ring events'', i.e., changes to the
membership or partition assignment of a Riak cluster.  Processes on a
Riak node can register a function in an instance of
\code{riak\_core\_ring\_events}, which implements the
\code{gen\_event} behavior.  Whenever the central process managing the
ring for that node changes the membership record for the overall
cluster, it fires off an event that causes each of those callback
modules to call the registered function.  In this fashion, it is
easy for various parts of Riak to respond to changes in one of Riak's
most central data structures without having to add complexity to the
central management of that structure.

Most common concurrency and communication patterns are handled with
the three primary behaviors we've just discussed: \code{gen\_server},
\code{gen\_fsm}, and \code{gen\_event}.  However, in large systems,
some application-specific patterns emerge over time that warrant the
creation of new behaviors.  Riak includes one such behavior,
\code{riak\_core\_vnode}, which formalizes how virtual nodes are
implemented.  Virtual nodes are the primary storage abstraction in
Riak, exposing a uniform interface for key-value storage to the
request-driving FSMs.  The interface for callback modules is specified
using the \code{behavior\_info/1} function, as follows:


\begin{verbatim}
behavior_info(callbacks) ->
    [{init,1},
     {handle_command,3},
     {handoff_starting,2},
     {handoff_cancelled,1},
     {handoff_finished,2},
     {handle_handoff_command,3},
     {handle_handoff_data,2},
     {encode_handoff_item,2},
     {is_empty,1},
     {terminate,2},
     {delete,1}];
\end{verbatim}

\noindent The above example shows the \code{behavior\_info/1} function from
\code{riak\_core\_vnode}.  The list of \code{\{CallbackFunction,
Arity\}} tuples defines the contract that callback modules must
follow.  Concrete virtual node implementations must export these
functions, or the compiler will emit a warning. Implementing your own
OTP behaviors is relatively straightforward. Alongside defining your
callback functions, using the \code{proc\_lib} and \code{sys} modules,
you need to start them with particular functions, handle system
messages and monitor the parent in case it terminates.

\end{aosasect2}

\end{aosasect1}

\begin{aosasect1}{Supervisors}

The supervisor behavior's task is to monitor its children and, based
on some preconfigured rules, take action when they terminate. Children
consist of both supervisors and worker processes. This allows the Riak
codebase to focus on the correct case, which enables the supervisor to
handle software bugs, corrupt data or system errors in a consistent
way across the whole system. In the Erlang world, this non-defensive
programming approach is often referred to the ``let it crash''
strategy. The children that make up the supervision tree can include
both supervisors and worker processes. Worker processes are OTP
behaviors including the \code{gen\_fsm}, \code{gen\_server}, and
\code{gen\_event}. The Riak team, not having to handle borderline
error cases, get to work with a smaller code base. This code base,
because of its use of behaviors, is smaller to start off with, as it
only deals with specific code. Riak has a top-level supervisor like
most Erlang applications, and also has sub-supervisors for groups
of processes with related responsibilities.  Examples include Riak's
virtual nodes, TCP socket listeners, and query-response managers.

\begin{aosasect2}{Supervisor Callback Functions}

To demonstrate how the supervisor behavior is implemented, we will use
the \code{riak\_core\_sup.erl} module. The Riak core supervisor is the
top level supervisor of the Riak core application. It starts a set of
static workers and supervisors, together with a dynamic number of
workers handling the HTTP and HTTPS bindings of the node's RESTful API
defined in application specific configuration files. In a similar way
to \code{gen\_servers}, all supervisor callback modules must include
the \code{-behavior(supervisor).} directive. They are started using
the \code{start} or \code{start\_link} functions which take the
optional \code{ServerName}, the \code{CallBackModule}, and an
\code{Argument} which is passed to the \code{init/1} callback
function.

Looking at the first few lines of code in the
\code{riak\_core\_sup.erl} module, alongside the behavior directive
and a macro we will describe later, we notice the \code{start\_link/3}
function:

\begin{verbatim}
-module(riak_core_sup).
-behavior(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
\end{verbatim}

\noindent Starting a supervisor will result in a new process being spawned, and
the \code{init/1} callback function being called in the callback
module \code{riak\_core\_sup.erl}. The \code{ServerName} is a tuple of
the format \code{\{local, Name\}} or \code{\{global, Name\}}, where
\code{Name} is the supervisor's registered name. In our example, both
the registered name and the callback module are the atom
\code{riak\_core\_sup}, originating form the \code{?MODULE} macro. We
pass the empty list as an argument to \code{init/1}, treating it as a
null value. The \code{init} function is the only supervisor callback
function. It has to return a tuple with format:

\begin{verbatim}
{ok,  {SupervisorSpecification, ChildSpecificationList}}
\end{verbatim}

\noindent where \code{SupervisorSpecification} is a 3-tuple 
\code{\{RestartStrategy, AllowedRestarts, \linebreak MaxSeconds\}} containing
information on how to handle process crashes and
restarts. \code{Restart\-Strategy} is one of three configuration
parameters determining how the behavior's siblings are affected upon
abnormal termination:

\begin{aosaitemize}

  \item \code{one\_for\_one}: other processes in the supervision tree
  are not affected.

  \item \code{rest\_for\_one}: processes started after the terminating
  process are terminated and restarted.

  \item \code{one\_for\_all}: all processes are terminated and restarted.

\end{aosaitemize}

\code{AllowedRestarts} states how many times any of the supervisor
children may terminate in \code{MaxSeconds} before the supervisor
terminates itself (and its children).  When ones terminates,
it sends an EXIT signal to its
supervisor which, based on its restart strategy, handles the termination
accordingly. The supervisor terminating after reaching the maximum
allowed restarts ensures that cyclic restarts and other issues that
cannot be resolved at this level are escalated. Chances are that the
issue is in a process located in a different sub-tree, allowing the
supervisor receiving the escalation to terminate the affected sub-tree
and restart it.

Examining the last line of the \code{init/1} callback function in the
\code{riak\_core\_sup.erl} module, we notice that this particular
supervisor has a one-for-one strategy, meaning that the processes are
independent of each other. The supervisor will allow a maximum of ten
restarts before restarting itself.

\code{ChildSpecificationList} specifies which children the supervisor
has to start and monitor, together with information on how to
terminate and restart them. It consists of a list of tuples of the
following format:

\begin{verbatim}
{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}
\end{verbatim}

\code{Id} is a unique identifier for that particular
supervisor. \code{Module}, \code{Function}, and \code{Arguments} is an
exported function which results in the behavior \code{start\_link}
function being called, returning the tuple of the format \code{\{ok,
Pid\}}. The \code{Restart} strategy dictates what happens
depending on the termination type of the process, which can be:

\begin{aosaitemize}

\item \code{transient} processes, which are never restarted;

  \item \code{temporary} processes, are restarted only if they terminate
  abnormally; and

  \item \code{permanent} processes, which are always restarted, regardless of
  the termination being normal or abnormal.

\end{aosaitemize}

\code{Shutdown} is a value in milliseconds referring to the time the
behavior is allowed to execute in the \code{terminate} function when
terminating as the result of a restart or shutdown. The atom
\code{infinity} can also be used, but for behaviors other than
supervisors, it is highly discouraged. \code{Type} is either the atom
\code{worker}, referring to the generic servers, event handlers and
finite state machines, or the atom \code{supervisor}. Together with
\code{ModuleList}, a list of modules implementing the behavior, they
are used to control and suspend processes during the runtime software
upgrade procedures. Only existing or user implemented behaviors may be
part of the child specification list and hence included in a
supervision tree.

With this knowledge at hand, we should now be able to formulate a
restart strategy defining inter-process dependencies, fault tolerance
thresholds and escalation procedures based on a common
architecture. We should also be able to understand what is going on in
the \code{init/1} example of the \code{riak\_core\_sup.erl}
module. First of all, study the \code{CHILD} macro. It creates the
child specification for one child, using the callback module name as
\code{Id}, making it permanent and giving it a shut down time of 5
seconds. Different child types can be workers or supervisors. Have a
look at the example, and see what you can make out of it:

\begin{verbatim}
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

init([]) ->
    RiakWebs = case lists:flatten(riak_core_web:bindings(http),
                                  riak_core_web:bindings(https)) of
                   [] ->
                       %% check for old settings, in case app.config
                       %% was not updated
                       riak_core_web:old_binding();
                   Binding ->
                       Binding
               end,

    Children =
                 [?CHILD(riak_core_vnode_sup, supervisor),
                  ?CHILD(riak_core_handoff_manager, worker),
                  ?CHILD(riak_core_handoff_listener, worker),
                  ?CHILD(riak_core_ring_events, worker),
                  ?CHILD(riak_core_ring_manager, worker),
                  ?CHILD(riak_core_node_watcher_events, worker),
                  ?CHILD(riak_core_node_watcher, worker),
                  ?CHILD(riak_core_gossip, worker) |
                  RiakWebs
                 ],
    {ok, {{one_for_one, 10, 10}, Children}}.
\end{verbatim}

Most of the \code{Children} started by this supervisor are statically
defined workers (or in the case of the \code{vnode\_sup}, a
supervisor).  The exception is the \code{RiakWebs} portion, which is
dynamically defined depending on the HTTP portion of Riak's
configuration file.

With the exception of library applications, every OTP application,
including those in Riak, will have their own supervision tree. In
Riak, various top-level applications are running in the Erlang node,
such as \code{riak\_core} for distributed systems algorithms,
\code{riak\_kv} for key/value storage semantics, \code{webmachine} for
HTTP, and more.  We have shown the expanded tree under
\code{riak\_core} to demonstrate the multi-level supervision going on.
One of the many benefits of this structure is that a given subsystem
can be crashed (due to bug, environmental problem, or intentional
action) and only that subtree will in a first instance be terminated.

The supervisor will restart the needed processes and the overall
system will not be affected. In practice we have seen this work
well for Riak.  A user might figure out how to crash a virtual node,
but it will just be restarted by \code{riak\_core\_vnode\_sup}.  If
they manage to crash that, the \code{riak\_core} supervisor will
restart it, propagating the termination to the top-level supervisor.
This failure isolation and recovery mechanism allows Riak (and Erlang)
developers to straightforwardly build resilient systems.

The value of the supervisory model was shown when one large industrial
user created a very abusive environment in order to find out where
each of several database systems would fall apart.  This environment
created random huge bursts of both traffic and failure conditions.
They were confused when Riak simply wouldn't stop running, even under
the worst such arrangement.  Under the covers, of course, they were
able to make individual processes or subsystems crash in multiple
ways---but the supervisors would clean up and restart things to put
the whole system back into working order every time.

\end{aosasect2}

\begin{aosasect2}{Applications}

The \code{application} behavior we previously introduced is used to
package Erlang modules and resources into reusable components. In OTP,
there are two kinds of applications. The most common form, called
normal applications, will start a supervision tree and all of the
relevant static workers. Library applications such as the Standard
Library, which come as part of the Erlang distribution, contain
library modules but do not start a supervision tree. This is not to
say that the code may not contain processes or supervision trees. It
just means they are started as part of a supervision tree belonging to
another application.

An Erlang system will consist of a set of loosely coupled
applications. Some are written by the developers, some are available
as open source, and others are be part of the Erlang/OTP
distribution. The Erlang runtime system and its tools treat all
applications equally, regardless of whether they are part of the
Erlang distribution or not.

\end{aosasect2}

\end{aosasect1}

\begin{aosasect1}{Replication and Communication in Riak}

Riak was designed for extreme reliability and availability at a
massive scale, and was inspired by Amazon's Dynamo storage system
\cite{bib:amazon:dynamo}.  Dynamo and Riak's architectures combine
aspects of both Distributed Hash Tables (DHTs) and traditional
databases.  Two key techniques that both Riak and Dynamo use are
\emph{consistent hashing} for replica placement and a \emph{gossip
protocol} for sharing common state.

Consistent hashing requires that all nodes in the system know about
each other, and know what partitions each node owns.  This assignment
data could be maintained in a centrally managed configuration file,
but in large configurations, this becomes extremely difficult. Another
alternative is to use a central configuration server, but this
introduces a single point of failure in the system. Instead, Riak uses
a gossip protocol to propagate cluster membership and partition
ownership data throughout the system.

Gossip protocols, also called epidemic protocols, work exactly as they
sound.  When a node in the system wishes to change a piece of shared
data, it makes the change to its local copy of the data and gossips
the updated data to a random peer.  Upon receiving an update, a node
merges the received changes with its local state and gossips again to
another random peer.

When a Riak cluster is started, all nodes must be configured with the
same partition count. The consistent hashing ring is then divided by
the partition count and each interval is stored locally as a
\code{\{HashRange, Owner\}} pair. The first node in a cluster simply
claims all the partitions.  When a new node joins the cluster, it
contacts an existing node for its list of \code{\{HashRange, Owner\}}
pairs.  It then claims (partition count)/(number of nodes) pairs,
updating its local state to reflect its new ownership. The updated
ownership information is then gossiped to a peer. This updated state
then spread throughout the entire cluster using the above algorithm.

By using a gossip protocol, Riak avoids introducing a single point of
failure in the form of a centralized configuration server, relieving
system operators from having to maintain critical cluster
configuration data.  Any node can then use the gossiped partition
assignment data in the system to route requests.  When used together,
the gossip protocol and consistent hashing enable Riak to function as
a truly decentralized system, which has important consequences for
deploying and operating large-scale systems.

\end{aosasect1}

\begin{aosasect1}{Conclusions and Lessons Learned}

Most programmers believe that smaller and simpler codebases are not
only easier to maintain, they often have fewer bugs.  By using
Erlang's basic distribution primitives for communication in a cluster,
Riak can start out with a fundamentally sound asynchronous messaging
layer and build its own protocols without having to worry about that
underlying implementation. As Riak grew into a mature system, some
aspects of its networked communication moved away from use of Erlang's
built-in distribution (and toward direct manipulation of TCP sockets)
while others remained a good fit for the included primitives.  By
starting out with Erlang's native message passing for everything, the
Riak team was able to build out the whole system very quickly.  These
primitives are clean and clear enough that it was still easy later to
replace the few places where they turned out to not be the best fit in
production.

Also, due to the nature of Erlang messaging and the lightweight core
of the Erlang VM, a user can just as easily run 12 nodes on 1 machine
or 12 nodes on 12 machines. This makes development and testing much
easier when compared to more heavyweight messaging and clustering
mechanisms. This has been especially valuable due to Riak's
fundamentally distributed nature. Historically, most distributed
systems are very difficult to operate in a ``development mode'' on a
single developer's laptop. As a result, developers often end up
testing their code in an environment that is a subset of their full
system, with very different behavior. Since a many-node Riak cluster
can be trivially run on a single laptop without excessive resource
consumption or tricky configuration, the development process can more
easily produce code that is ready for production deployment.

The use of Erlang/OTP supervisors makes Riak much more resilient in
the face of subcomponent crashes. Riak takes this further; inspired by
such behaviors, a Riak cluster is also able to easily keep functioning
even when whole nodes crash and disappear from the system. This can
lead to a sometimes-surprising level of resilience.  One example of
this was when a large enterprise was stress-testing various databases
and intentionally crashing them to observe their edge conditions.
When they got to Riak, they became confused.  Each time they would
find a way (through OS-level manipulation, bad IPC, etc) to crash a
subsystem of Riak, they would see a very brief dip in performance and
then the system returned to normal behavior. This is a direct result
of a thoughtful ``let it crash'' approach. Riak was cleanly restarting
each of these subsystems on demand, and the overall system simply
continued to function. That experience shows exactly the sort of
resilience enabled by Erlang/OTP's approach to building programs.

\begin{aosasect2}{Acknowledgments}

This chapter is based on Francesco Cesarini and Simon Thompson's 2009
lecture notes from the central European Functional Programming School
held in Budapest and Kom\'{a}rno. Major contributions were made by
Simon Thompson of the University of Kent in Canterbury, UK. A special
thank you goes to all of the reviewers, who at different stages in the
writing of this chapter provided valuable feedback.

\end{aosasect2}

\end{aosasect1}

\end{aosachapter}
